Crate crossbeam_channel[−][src]
Multi-producer multi-consumer channels for message passing.
Crossbeam's channels are an alternative to the std::sync::mpsc
channels provided by the
standard library. They are an improvement in terms of performance, ergonomics, and features.
Here's a quick example:
use crossbeam_channel as channel; // Create a channel of unbounded capacity. let (s, r) = channel::unbounded(); // Send a message into the channel. s.send("Hello world!"); // Receive the message from the channel. assert_eq!(r.recv(), Some("Hello world!"));
Types of channels
Channels are created using two functions:
-
bounded
creates a channel of bounded capacity, i.e. there is a limit to how many messages it can hold. -
unbounded
creates a channel of unbounded capacity, i.e. it can contain arbitrary number of messages at any time.
Both functions return two handles: a sender and a receiver. Senders and receivers represent two opposite sides of a channel. Messages are sent into the channel using senders and received using receivers.
Creating a bounded channel:
use crossbeam_channel as channel; // Create a channel that can hold at most 5 messages at a time. let (s, r) = channel::bounded(5); // Can only send 5 messages without blocking. for i in 0..5 { s.send(i); } // Another call to `send` would block because the channel is full. // s.send(5);
Creating an unbounded channel:
use crossbeam_channel as channel; // Create an unbounded channel. let (s, r) = channel::unbounded(); // Can send any number of messages into the channel without blocking. for i in 0..1000 { s.send(i); }
A rather special case is a bounded, zero-capacity channel. This kind of channel cannot hold any messages at all! In order to send a message through the channel, a sending thread and a receiving thread have to pair up at the same time:
use std::thread; use crossbeam_channel as channel; // Create a zero-capacity channel. let (s, r) = channel::bounded(0); // Spawn a thread that sends a message into the channel. // Sending blocks until a receive operation appears on the other side. thread::spawn(move || s.send("Hi!")); // Receive the message. // Receiving blocks until a send operation appears on the other side. assert_eq!(r.recv(), Some("Hi!"));
Sharing channels
Senders and receivers can be either shared by reference or cloned and then sent to other threads. There can be multiple senders and multiple receivers associated with the same channel.
Sharing by reference:
extern crate crossbeam; use crossbeam_channel as channel; let (s, r) = channel::unbounded(); crossbeam::scope(|scope| { // Spawn a thread that sends one message and then receives one. scope.spawn(|| { s.send(1); r.recv().unwrap(); }); // Spawn another thread that does the same thing. scope.spawn(|| { s.send(2); r.recv().unwrap(); }); });
Sharing by cloning:
use std::thread; use crossbeam_channel as channel; let (s1, r1) = channel::unbounded(); let (s2, r2) = (s1.clone(), r1.clone()); // Spawn a thread that sends one message and then receives one. thread::spawn(move || { s1.send(1); r1.recv().unwrap(); }); // Spawn another thread that receives a message and then sends one. thread::spawn(move || { r2.recv().unwrap(); s2.send(2); });
Note that cloning only creates a new reference to the same sending or receiving side. Cloning does not create a new channel.
Closing channels
When all senders associated with a channel get dropped, the channel becomes closed. No more messages can be sent, but any remaining messages can still be received. Receive operations on a closed channel never block, even if the channel is empty.
use crossbeam_channel as channel; let (s, r) = channel::unbounded(); s.send(1); s.send(2); s.send(3); // The only sender is dropped, closing the channel. drop(s); // The remaining messages can be received. assert_eq!(r.recv(), Some(1)); assert_eq!(r.recv(), Some(2)); assert_eq!(r.recv(), Some(3)); // There are no more messages in the channel. assert!(r.is_empty()); // Note that calling `r.recv()` will not block. // Instead, `None` is returned immediately. assert_eq!(r.recv(), None);
Blocking and non-blocking operations
Sending a message into a full bounded channel will block until an empty slot in the channel becomes available. Sending into an unbounded channel never blocks because there is always enough space in it. Zero-capacity channels are always empty, and sending blocks until a receive operation appears on the other side of the channel.
Receiving from an empty channel blocks until a message is sent into the channel or the channel becomes closed. Zero-capacity channels are always empty, and receiving blocks until a send operation appears on the other side of the channel or it becomes closed.
There is also a non-blocking method try_recv
, which receives a message if it is immediately
available, or returns None
otherwise.
use crossbeam_channel as channel; let (s, r) = channel::bounded(1); // Send a message into the channel. s.send("foo"); // This call would block because the channel is full. // s.send("bar"); // Receive the message. assert_eq!(r.recv(), Some("foo")); // This call would block because the channel is empty. // r.recv(); // Try receiving a message without blocking. assert_eq!(r.try_recv(), None); // Close the channel. drop(s); // This call doesn't block because the channel is now closed. assert_eq!(r.recv(), None);
For greater control over blocking, consider using the select!
macro.
Iteration
A channel is a special kind of iterator, where items can be dynamically produced by senders and
consumed by receivers. Indeed, Receiver
implements the Iterator
trait, and calling
next
is equivalent to calling recv
.
use std::thread; use crossbeam_channel as channel; let (s, r) = channel::unbounded(); thread::spawn(move || { s.send(1); s.send(2); s.send(3); // `s` was moved into the closure so now it gets dropped, // thus closing the channel. }); // Collect all messages from the channel. // // Note that the call to `collect` blocks until the channel becomes // closed and empty, i.e. until `r.next()` returns `None`. let v: Vec<_> = r.collect(); assert_eq!(v, [1, 2, 3]);
Select
The select!
macro allows declaring a set of channel operations and blocking until any one
of them becomes ready. Finally, one of the operations is executed. If multiple operations
are ready at the same time, a random one is chosen. It is also possible to declare a default
case that gets executed if none of the operations are initially ready.
An example of receiving a message from two channels, whichever becomes ready first:
use std::thread; use crossbeam_channel as channel; let (s1, r1) = channel::unbounded(); let (s2, r2) = channel::unbounded(); thread::spawn(move || s1.send("foo")); thread::spawn(move || s2.send("bar")); // Only one of these two receive operations will be executed. select! { recv(r1, msg) => assert_eq!(msg, Some("foo")), recv(r2, msg) => assert_eq!(msg, Some("bar")), }
For more details, take a look at the documentation for select!
.
If you need to dynamically add cases rather than define them statically inside the macro, use
Select
instead.
Frequently asked questions
How to try receiving a message, but also check whether the channel is empty or closed?
Use the select!
macro:
use crossbeam_channel as channel; let (s, r) = channel::unbounded(); s.send("hello"); select! { recv(r, msg) => match msg { Some(msg) => println!("received {:?}", msg), None => println!("the channel is closed"), } default => println!("the channel is empty"), }
How to try sending a message without blocking when the channel is full?
Use the select!
macro:
use crossbeam_channel as channel; let (s, r) = channel::bounded(1); s.send("first"); select! { send(s, "second") => println!("message sent"), default => println!("the channel is full"), }
How to try sending/receiving a message with a timeout?
Function after
creates a special kind of channel that delivers a message after the
specified timeout. Use select!
to wait until a message is sent/received or the timeout
is fired:
use std::time::Duration; use crossbeam_channel as channel; let (s, r) = channel::bounded(1); s.send("hello"); let timeout = Duration::from_millis(100); select! { recv(r, msg) => match msg { Some(msg) => println!("received {:?}", msg), None => println!("the channel is closed"), } recv(channel::after(timeout)) => println!("timed out; the channel is still empty"), }
Macros
select |
Waits on a set of channel operations. |
Structs
Receiver |
The receiving side of a channel. |
Select |
Waits on a set of channel operations. |
Sender |
The sending side of a channel. |
Functions
after |
Creates a receiver that delivers a message after a certain duration of time. |
bounded |
Creates a channel of bounded capacity. |
tick |
Creates a receiver that delivers messages periodically. |
unbounded |
Creates a channel of unbounded capacity. |